{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Converting the Parquet data format to recordIO-wrapped protobuf\n",
    "\n",
    "---\n",
    "\n",
    "---\n",
    "## Contents\n",
    "\n",
    "1. [Introduction](#Introduction)\n",
    "1. [Optional data ingestion](#Optional-data-ingestion)\n",
    "    1. [Download the data](#Download-the-data)\n",
    "    1. [Convert into Parquet format](#Convert-into-Parquet-format)\n",
    "1. [Data conversion](#Data-conversion)\n",
    "    1. [Convert to recordIO protobuf format](#Convert-to-recordIO-protobuf-format)\n",
    "    1. [Upload to S3](#Upload-to-S3)\n",
    "1. [Training the linear model](#Training-the-linear-model)\n",
    "\n",
    "\n",
    "## Introduction\n",
    "In this notebook we illustrate how to convert a Parquet data format into the recordIO-protobuf format that many SageMaker algorithms consume. For the demonstration, first we'll convert the publicly available MNIST dataset into the Parquet format. Subsequently, it is converted into the recordIO-protobuf format and uploaded to S3 for consumption by the linear learner algorithm. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true,
    "isConfigCell": true
   },
   "outputs": [],
   "source": [
    "import os\n",
    "import io\n",
    "import re\n",
    "import boto3\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "import time\n",
    "from sagemaker import get_execution_role\n",
    "\n",
    "role = get_execution_role()\n",
    "\n",
    "bucket = '<S3 bucket>'\n",
    "prefix = 'sagemaker/DEMO-parquet'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "!conda install -y -c conda-forge fastparquet scikit-learn"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Optional data ingestion"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Download the data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "import pickle, gzip, numpy, urllib.request, json\n",
    "\n",
    "# Load the dataset\n",
    "urllib.request.urlretrieve(\"http://deeplearning.net/data/mnist/mnist.pkl.gz\", \"mnist.pkl.gz\")\n",
    "with gzip.open('mnist.pkl.gz', 'rb') as f:\n",
    "    train_set, valid_set, test_set = pickle.load(f, encoding='latin1')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from fastparquet import write\n",
    "from fastparquet import ParquetFile\n",
    "\n",
    "def save_as_parquet_file(dataset, filename, label_col):\n",
    "    X = dataset[0]\n",
    "    y = dataset[1]\n",
    "    data = pd.DataFrame(X)\n",
    "    data[label_col] = y\n",
    "    data.columns = data.columns.astype(str) #Parquet expexts the column names to be strings\n",
    "    write(filename, data)\n",
    "    \n",
    "def read_parquet_file(filename):\n",
    "    pf = ParquetFile(filename)\n",
    "    return pf.to_pandas()\n",
    "\n",
    "def features_and_target(df, label_col):\n",
    "    X = df.loc[:, df.columns != label_col].values\n",
    "    y = df[label_col].values\n",
    "    return [X, y]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Convert into Parquet format"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "trainFile = 'train.parquet'\n",
    "validFile = 'valid.parquet'\n",
    "testFile = 'test.parquet'\n",
    "label_col = 'target'\n",
    "\n",
    "save_as_parquet_file(train_set, trainFile, label_col)\n",
    "save_as_parquet_file(valid_set, validFile, label_col)\n",
    "save_as_parquet_file(test_set, testFile, label_col)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Data conversion\n",
    "\n",
    "Since algorithms have particular input and output requirements, converting the dataset is also part of the process that a data scientist goes through prior to initiating training. E.g., the Amazon SageMaker implementation of Linear Learner takes recordIO-wrapped protobuf. Most of the conversion effort is handled by the Amazon SageMaker Python SDK, imported as `sagemaker` below."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "dfTrain = read_parquet_file(trainFile)\n",
    "dfValid = read_parquet_file(validFile)\n",
    "dfTest = read_parquet_file(testFile)\n",
    "\n",
    "train_X, train_y = features_and_target(dfTrain, label_col)\n",
    "valid_X, valid_y = features_and_target(dfValid, label_col)\n",
    "test_X, test_y = features_and_target(dfTest, label_col)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Convert to recordIO protobuf format"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import io\n",
    "import numpy as np\n",
    "import sagemaker.amazon.common as smac\n",
    "\n",
    "trainVectors = np.array([t.tolist() for t in train_X]).astype('float32')\n",
    "trainLabels = np.where(np.array([t.tolist() for t in train_y]) == 0, 1, 0).astype('float32')\n",
    "\n",
    "bufTrain = io.BytesIO()\n",
    "smac.write_numpy_to_dense_tensor(bufTrain, trainVectors, trainLabels)\n",
    "bufTrain.seek(0)\n",
    "\n",
    "\n",
    "validVectors = np.array([t.tolist() for t in valid_X]).astype('float32')\n",
    "validLabels = np.where(np.array([t.tolist() for t in valid_y]) == 0, 1, 0).astype('float32')\n",
    "\n",
    "bufValid = io.BytesIO()\n",
    "smac.write_numpy_to_dense_tensor(bufValid, validVectors, validLabels)\n",
    "bufValid.seek(0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Upload to S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import boto3\n",
    "import os\n",
    "\n",
    "key = 'recordio-pb-data'\n",
    "boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(bufTrain)\n",
    "s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key)\n",
    "print('uploaded training data location: {}'.format(s3_train_data))\n",
    "\n",
    "boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation', key)).upload_fileobj(bufValid)\n",
    "s3_validation_data = 's3://{}/{}/validation/{}'.format(bucket, prefix, key)\n",
    "print('uploaded validation data location: {}'.format(s3_validation_data))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Training the linear model\n",
    "\n",
    "Once we have the data preprocessed and available in the correct format for training, the next step is to actually train the model using the data. Since this data is relatively small, it isn't meant to show off the performance of the Linear Learner training algorithm, although we have tested it on multi-terabyte datasets.\n",
    "\n",
    "This example takes four to six minutes to complete. Majority of the time is spent provisioning hardware and loading the algorithm container since the dataset is small.\n",
    "\n",
    "First, let's specify our containers.  Since we want this notebook to run in all 4 of Amazon SageMaker's regions, we'll create a small lookup.  More details on algorithm containers can be found in [AWS documentation](https://docs-aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from sagemaker.amazon.amazon_estimator import get_image_uri\n",
    "container = get_image_uri(boto3.Session().region_name, 'linear-learner')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "linear_job = 'DEMO-linear-' + time.strftime(\"%Y-%m-%d-%H-%M-%S\", time.gmtime())\n",
    "\n",
    "print(\"Job name is:\", linear_job)\n",
    "\n",
    "linear_training_params = {\n",
    "    \"RoleArn\": role,\n",
    "    \"TrainingJobName\": linear_job,\n",
    "    \"AlgorithmSpecification\": {\n",
    "        \"TrainingImage\": container,\n",
    "        \"TrainingInputMode\": \"File\"\n",
    "    },\n",
    "    \"ResourceConfig\": {\n",
    "        \"InstanceCount\": 1,\n",
    "        \"InstanceType\": \"ml.c4.2xlarge\",\n",
    "        \"VolumeSizeInGB\": 10\n",
    "    },\n",
    "    \"InputDataConfig\": [\n",
    "        {\n",
    "            \"ChannelName\": \"train\",\n",
    "            \"DataSource\": {\n",
    "                \"S3DataSource\": {\n",
    "                    \"S3DataType\": \"S3Prefix\",\n",
    "                    \"S3Uri\": \"s3://{}/{}/train/\".format(bucket, prefix),\n",
    "                    \"S3DataDistributionType\": \"FullyReplicated\"\n",
    "                }\n",
    "            },\n",
    "            \"CompressionType\": \"None\",\n",
    "            \"RecordWrapperType\": \"None\"\n",
    "        },\n",
    "        {\n",
    "            \"ChannelName\": \"validation\",\n",
    "            \"DataSource\": {\n",
    "                \"S3DataSource\": {\n",
    "                    \"S3DataType\": \"S3Prefix\",\n",
    "                    \"S3Uri\": \"s3://{}/{}/validation/\".format(bucket, prefix),\n",
    "                    \"S3DataDistributionType\": \"FullyReplicated\"\n",
    "                }\n",
    "            },\n",
    "            \"CompressionType\": \"None\",\n",
    "            \"RecordWrapperType\": \"None\"\n",
    "        }\n",
    "\n",
    "    ],\n",
    "    \"OutputDataConfig\": {\n",
    "        \"S3OutputPath\": \"s3://{}/{}/\".format(bucket, prefix)\n",
    "    },\n",
    "    \"HyperParameters\": {\n",
    "        \"feature_dim\": \"784\",\n",
    "        \"mini_batch_size\": \"200\",\n",
    "        \"predictor_type\": \"binary_classifier\",\n",
    "        \"epochs\": \"10\",\n",
    "        \"num_models\": \"32\",\n",
    "        \"loss\": \"absolute_loss\"\n",
    "    },\n",
    "    \"StoppingCondition\": {\n",
    "        \"MaxRuntimeInSeconds\": 60 * 60\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's kick off our training job in SageMaker's distributed, managed training, using the parameters we just created. Because training is managed (AWS handles spinning up and spinning down hardware), we don't have to wait for our job to finish to continue, but for this case, let's setup a while loop so we can monitor the status of our training."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "sm = boto3.Session().client('sagemaker')\n",
    "sm.create_training_job(**linear_training_params)\n",
    "\n",
    "status = sm.describe_training_job(TrainingJobName=linear_job)['TrainingJobStatus']\n",
    "print(status)\n",
    "sm.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=linear_job)\n",
    "if status == 'Failed':\n",
    "    message = sm.describe_training_job(TrainingJobName=linear_job)['FailureReason']\n",
    "    print('Training failed with the following error: {}'.format(message))\n",
    "    raise Exception('Training job failed')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sm.describe_training_job(TrainingJobName=linear_job)['TrainingJobStatus']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Environment (conda_python3)",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  },
  "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.  Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
